1use super::archive::*;
2use super::consts::*;
3use super::reader::*;
4use super::segmenter::*;
5use crate::ext::io::*;
6use crate::ext::mutex::*;
7use crate::scripts::base::*;
8use crate::types::*;
9use crate::utils::encoding::*;
10use crate::utils::threadpool::ThreadPool;
11use anyhow::Result;
12use sha2::{Digest, Sha256};
13use std::collections::{BTreeMap, HashMap, HashSet};
14use std::io::{Seek, Write};
15use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
16use std::sync::{Arc, Mutex};
17
18#[derive(Clone)]
19struct WrittenSegment {
20 is_compressed: bool,
21 start: u64,
22 original_size: u64,
23 archived_size: u64,
24}
25
26#[derive(Default)]
27struct Stats {
28 total_original_size: AtomicU64,
29 final_archive_size: AtomicU64,
30 total_segments: AtomicUsize,
31 unique_segments: AtomicUsize,
32 deduplication_savings: AtomicU64,
33}
34
35impl std::fmt::Display for Stats {
36 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
37 let total_original_size = self
38 .total_original_size
39 .load(std::sync::atomic::Ordering::Relaxed);
40 let final_archive_size = self
41 .final_archive_size
42 .load(std::sync::atomic::Ordering::Relaxed);
43 let total_segments = self
44 .total_segments
45 .load(std::sync::atomic::Ordering::Relaxed);
46 let unique_segments = self
47 .unique_segments
48 .load(std::sync::atomic::Ordering::Relaxed);
49 let deduplication_savings = self
50 .deduplication_savings
51 .load(std::sync::atomic::Ordering::Relaxed);
52 write!(
53 f,
54 "Total Original Size: {} bytes\nFinal Archive Size: {} bytes\nTotal Segments: {}\nUnique Segments: {}\nDeduplication Savings: {} bytes",
55 total_original_size,
56 final_archive_size,
57 total_segments,
58 unique_segments,
59 deduplication_savings
60 )
61 }
62}
63
64pub struct Xp3ArchiveWriter<T: Write + Seek> {
65 file: Arc<Mutex<T>>,
66 segments: Arc<Mutex<HashMap<[u8; 32], WrittenSegment>>>,
67 items: Arc<Mutex<BTreeMap<String, ArchiveItem>>>,
68 runner: ThreadPool<Result<()>>,
69 compress_files: bool,
70 compress_index: bool,
71 zlib_compression_level: u32,
72 segmenter: Option<Arc<Box<dyn Segmenter + Send + Sync>>>,
73 stats: Arc<Stats>,
74 compress_workers: usize,
75 processing_segments: Arc<Mutex<HashSet<[u8; 32]>>>,
76 use_zstd: bool,
77 zstd_compression_level: i32,
78 no_adler: bool,
79 #[cfg(feature = "zopfli")]
80 use_zopfli: bool,
81 #[cfg(feature = "zopfli")]
82 zopfli_iteration_count: std::num::NonZeroU64,
83 #[cfg(feature = "zopfli")]
84 zopfli_iterations_without_improvement: std::num::NonZeroU64,
85 #[cfg(feature = "zopfli")]
86 zopfli_maximum_block_splits: u16,
87}
88
89impl Xp3ArchiveWriter<std::io::BufWriter<std::fs::File>> {
90 pub fn new(filename: &str, files: &[&str], config: &ExtraConfig) -> Result<Self> {
91 let file = std::fs::File::create(filename)?;
92 let mut file = std::io::BufWriter::new(file);
93 let mut items = BTreeMap::new();
94 for file in files {
95 let item = ArchiveItem {
96 name: file.to_string(),
97 file_hash: 0,
98 original_size: 0,
99 archived_size: 0,
100 segments: Vec::new(),
101 };
102 items.insert(file.to_string(), item);
103 }
104 let segmenter = create_segmenter(&config.xp3_segmenter).map(|s| Arc::new(s));
105 file.write_all(XP3_MAGIC)?;
106 file.write_u64(0)?; Ok(Self {
108 file: Arc::new(Mutex::new(file)),
109 segments: Arc::new(Mutex::new(HashMap::new())),
110 items: Arc::new(Mutex::new(items)),
111 runner: ThreadPool::new(
112 if config.xp3_segmenter.is_none() {
113 1
114 } else {
115 config.xp3_pack_workers.max(1)
116 },
117 Some("xp3-writer"),
118 false,
119 )?,
120 compress_files: config.xp3_compress_files,
121 compress_index: config.xp3_compress_index,
122 zlib_compression_level: config.zlib_compression_level,
123 segmenter,
124 stats: Arc::new(Stats::default()),
125 compress_workers: config.xp3_compress_workers.max(1),
126 processing_segments: Arc::new(Mutex::new(HashSet::new())),
127 use_zstd: config.xp3_zstd,
128 zstd_compression_level: config.zstd_compression_level,
129 no_adler: config.xp3_no_adler,
130 #[cfg(feature = "zopfli")]
131 use_zopfli: config.xp3_zopfli,
132 #[cfg(feature = "zopfli")]
133 zopfli_iteration_count: config.zopfli_iteration_count,
134 #[cfg(feature = "zopfli")]
135 zopfli_iterations_without_improvement: config.zopfli_iterations_without_improvement,
136 #[cfg(feature = "zopfli")]
137 zopfli_maximum_block_splits: config.zopfli_maximum_block_splits,
138 })
139 }
140}
141
142struct Writer<'a> {
143 inner: Box<dyn Write + 'a>,
144 mem: MemWriter,
145}
146
147impl std::fmt::Debug for Writer<'_> {
148 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
149 f.debug_struct("Writer").field("mem", &self.mem).finish()
150 }
151}
152
153impl<'a> Write for Writer<'a> {
154 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
155 self.mem.write(buf)
156 }
157
158 fn flush(&mut self) -> std::io::Result<()> {
159 self.mem.flush()
160 }
161}
162
163impl<'a> Seek for Writer<'a> {
164 fn seek(&mut self, pos: std::io::SeekFrom) -> std::io::Result<u64> {
165 self.mem.seek(pos)
166 }
167
168 fn stream_position(&mut self) -> std::io::Result<u64> {
169 self.mem.stream_position()
170 }
171
172 fn rewind(&mut self) -> std::io::Result<()> {
173 self.mem.rewind()
174 }
175}
176
177impl<'a> Drop for Writer<'a> {
178 fn drop(&mut self) {
179 let _ = self.inner.write_all(&self.mem.data);
180 let _ = self.inner.flush();
181 }
182}
183
184impl<T: Write + Seek + Sync + Send + 'static> Archive for Xp3ArchiveWriter<T> {
185 fn new_file<'a>(
186 &'a mut self,
187 name: &str,
188 size: Option<u64>,
189 ) -> Result<Box<dyn WriteSeek + 'a>> {
190 let inner = self.new_file_non_seek(name, size)?;
191 Ok(Box::new(Writer {
192 inner,
193 mem: MemWriter::new(),
194 }))
195 }
196
197 fn new_file_non_seek<'a>(
198 &'a mut self,
199 name: &str,
200 _size: Option<u64>,
201 ) -> Result<Box<dyn Write + 'a>> {
202 if self.segmenter.is_none() {
203 self.runner.join();
204 }
205 for err in self.runner.take_results() {
206 err?;
207 }
208 let item = {
209 let items = self.items.lock_blocking();
210 Arc::new(Mutex::new(
211 items
212 .get(name)
213 .ok_or_else(|| anyhow::anyhow!("File not found in archive: {}", name))?
214 .clone(),
215 ))
216 };
217 let (reader, writer) = std::io::pipe()?;
218 let reader = Reader::new(reader);
219 {
220 let file = self.file.clone();
221 let segments = self.segments.clone();
222 let items = self.items.clone();
223 let segmenter = self.segmenter.clone();
224 let stats = self.stats.clone();
225 let is_compressed = self.compress_files;
226 let zlib_compression_level = self.zlib_compression_level;
227 let workers = if self.segmenter.is_some() && is_compressed {
228 Some(Arc::new(ThreadPool::<Result<()>>::new(
229 self.compress_workers,
230 Some("xp3-compress"),
231 false,
232 )?))
233 } else {
234 None
235 };
236 let processiong_segments = self.processing_segments.clone();
237 let use_zstd = self.use_zstd;
238 #[cfg(feature = "zopfli")]
239 let use_zopfli = self.use_zopfli;
240 #[cfg(feature = "zopfli")]
241 let zopfli_iteration_count = self.zopfli_iteration_count;
242 #[cfg(feature = "zopfli")]
243 let zopfli_iterations_without_improvement = self.zopfli_iterations_without_improvement;
244 #[cfg(feature = "zopfli")]
245 let zopfli_maximum_block_splits = self.zopfli_maximum_block_splits;
246 let zstd_compression_level = self.zstd_compression_level;
247 let name = name.to_owned();
248 self.runner.execute(
249 move |_| {
250 let mut reader = reader;
251 let mut offset_in_file = 0u64;
252 if let Some(segmenter) = segmenter {
253 for seg in segmenter.segment(&mut reader, &name) {
254 let seg = seg?;
255 let hash: [u8; 32] = Sha256::digest(&seg).into();
256 let seg_offset_in_file = offset_in_file;
257 offset_in_file += seg.len() as u64;
258 let fseg = match {
259 let mut segments = segments.lock_blocking();
260 if let Some(old_seg) = segments.get(&hash) {
261 Err(old_seg.clone())
262 } else {
263 let seg_data = WrittenSegment {
264 is_compressed,
265 start: 0,
266 original_size: seg.len() as u64,
267 archived_size: seg.len() as u64,
268 };
269 segments.insert(hash, seg_data.clone());
270 Ok(seg_data)
271 }
272 } {
273 Ok(mut info) => {
274 if let Some(workers) = workers.as_ref() {
275 {
276 let mut processing =
277 processiong_segments.lock_blocking();
278 processing.insert(hash);
279 }
280 let file = file.clone();
281 let segments = segments.clone();
282 let stats = stats.clone();
283 let item = item.clone();
284 let processiong_segments = processiong_segments.clone();
285 workers.execute(
286 move |_| {
287 let data = {
288 if use_zopfli {
289 let option = zopfli::Options {
290 iteration_count: zopfli_iteration_count,
291 iterations_without_improvement:
292 zopfli_iterations_without_improvement,
293 maximum_block_splits:
294 zopfli_maximum_block_splits,
295 };
296 let mut e = zopfli::ZlibEncoder::new(option, zopfli::BlockType::Dynamic, Vec::new())?;
297 e.write_all(&seg)?;
298 e.finish()?
299 } else if use_zstd {
300 let mut e = zstd::stream::Encoder::new(
301 Vec::new(),
302 zstd_compression_level,
303 )?;
304 e.write_all(&seg)?;
305 e.finish()?
306 } else {
307 let mut e = flate2::write::ZlibEncoder::new(
308 Vec::new(),
309 flate2::Compression::new(
310 zlib_compression_level,
311 ),
312 );
313 e.write_all(&seg)?;
314 e.finish()?
315 }
316 };
317 let mut file = file.lock_blocking();
318 let start = file.seek(std::io::SeekFrom::End(0))?;
319 file.write_all(&data)?;
320 info.start = start;
321 info.archived_size = data.len() as u64;
322 let stats = stats.clone();
323 stats.total_original_size.fetch_add(
324 info.original_size,
325 Ordering::Relaxed,
326 );
327 stats.final_archive_size.fetch_add(
328 info.archived_size,
329 Ordering::Relaxed,
330 );
331 stats
332 .total_segments
333 .fetch_add(1, Ordering::Relaxed);
334 stats
335 .unique_segments
336 .fetch_add(1, Ordering::Relaxed);
337 let mut segments = segments.lock_blocking();
338 segments.insert(hash, info.clone());
339 let ninfo = Segment {
340 is_compressed: info.is_compressed,
341 start: info.start,
342 offset_in_file: seg_offset_in_file,
343 original_size: info.original_size,
344 archived_size: info.archived_size,
345 };
346 let mut item = item.lock_blocking();
347 item.original_size += ninfo.original_size;
348 item.archived_size += ninfo.archived_size;
349 item.segments.push(ninfo);
350 let mut processing =
351 processiong_segments.lock_blocking();
352 processing.remove(&hash);
353 Ok(())
354 },
355 true,
356 )?;
357 None
358 } else {
359 {
360 let mut processing =
361 processiong_segments.lock_blocking();
362 processing.insert(hash);
363 }
364 let data = seg;
365 let mut file = file.lock_blocking();
366 let start = file.seek(std::io::SeekFrom::End(0))?;
367 file.write_all(&data)?;
368 info.start = start;
369 info.archived_size = data.len() as u64;
370 let stats = stats.clone();
371 stats
372 .total_original_size
373 .fetch_add(info.original_size, Ordering::Relaxed);
374 stats
375 .final_archive_size
376 .fetch_add(info.archived_size, Ordering::Relaxed);
377 stats.total_segments.fetch_add(1, Ordering::Relaxed);
378 stats.unique_segments.fetch_add(1, Ordering::Relaxed);
379 let mut segments = segments.lock_blocking();
380 segments.insert(hash, info.clone());
381 let ninfo = Segment {
382 is_compressed: info.is_compressed,
383 start: info.start,
384 offset_in_file: seg_offset_in_file,
385 original_size: info.original_size,
386 archived_size: info.archived_size,
387 };
388 {
389 let mut processing =
390 processiong_segments.lock_blocking();
391 processing.remove(&hash);
392 }
393 Some(ninfo)
394 }
395 }
396 Err(mut seg_info) => {
397 let mut need_update = false;
398 loop {
399 if {
400 let processing = processiong_segments.lock_blocking();
401 !processing.contains(&hash)
402 } {
403 break;
404 }
405 need_update = true;
406 std::thread::sleep(std::time::Duration::from_millis(10));
407 }
408 if need_update {
409 seg_info = {
410 let segments = segments.lock_blocking();
411 segments
412 .get(&hash)
413 .ok_or(anyhow::anyhow!(
414 "Failed to get latest segment info."
415 ))?
416 .clone()
417 };
418 }
419 let stats = stats.clone();
420 stats
421 .total_original_size
422 .fetch_add(seg_info.original_size, Ordering::Relaxed);
423 stats
424 .deduplication_savings
425 .fetch_add(seg_info.archived_size, Ordering::Relaxed);
426 stats.total_segments.fetch_add(1, Ordering::Relaxed);
427 let ninfo = Segment {
428 is_compressed: seg_info.is_compressed,
429 start: seg_info.start,
430 offset_in_file: seg_offset_in_file,
431 original_size: seg_info.original_size,
432 archived_size: seg_info.archived_size,
433 };
434 Some(ninfo)
435 }
436 };
437 if let Some(fseg) = fseg {
438 let mut item = item.lock_blocking();
439 item.original_size += fseg.original_size;
440 item.archived_size += fseg.archived_size;
441 item.segments.push(fseg);
442 }
443 }
444 } else {
445 let mut file = file.lock_blocking();
446 let start = file.seek(std::io::SeekFrom::End(0))?;
447 let size = {
448 let mut writer = if is_compressed {
449 if use_zopfli {
450 let e = zopfli::ZlibEncoder::new(
451 zopfli::Options {
452 iteration_count: zopfli_iteration_count,
453 iterations_without_improvement:
454 zopfli_iterations_without_improvement,
455 maximum_block_splits: zopfli_maximum_block_splits,
456 },
457 zopfli::BlockType::Dynamic,
458 &mut *file,
459 )?;
460 Box::new(e) as Box<dyn Write>
461 } else if use_zstd {
462 let e = zstd::stream::Encoder::new(
463 &mut *file,
464 zstd_compression_level,
465 )?.auto_finish();
466 Box::new(e) as Box<dyn Write>
467 } else {
468 let e = flate2::write::ZlibEncoder::new(
469 &mut *file,
470 flate2::Compression::new(zlib_compression_level),
471 );
472 Box::new(e) as Box<dyn Write>
473 }
474 } else {
475 Box::new(&mut *file) as Box<dyn Write>
476 };
477 std::io::copy(&mut reader, &mut writer)?
478 };
479 let ninfo = Segment {
480 is_compressed,
481 start,
482 offset_in_file: 0,
483 original_size: size,
484 archived_size: if is_compressed {
485 file.stream_position()? - start
486 } else {
487 size
488 },
489 };
490 let mut item = item.lock_blocking();
491 item.original_size += ninfo.original_size;
492 item.archived_size += ninfo.archived_size;
493 let stats = stats.clone();
494 stats
495 .total_original_size
496 .fetch_add(ninfo.original_size, Ordering::Relaxed);
497 stats
498 .final_archive_size
499 .fetch_add(ninfo.archived_size, Ordering::Relaxed);
500 stats.total_segments.fetch_add(1, Ordering::Relaxed);
501 stats.unique_segments.fetch_add(1, Ordering::Relaxed);
502 item.segments.push(ninfo);
503 }
504 if let Some(workers) = workers {
505 workers.join();
506 for err in workers.take_results() {
507 err?;
508 }
509 }
510 let mut item = item.lock_blocking().to_owned();
511 item.file_hash = reader.into_checksum();
512 item.segments.sort_by_key(|s| s.offset_in_file);
513 let mut items = items.lock_blocking();
514 items.insert(item.name.clone(), item);
515 Ok(())
516 },
517 true,
518 )?;
519 }
520 Ok(Box::new(writer))
521 }
522
523 fn write_header(&mut self) -> Result<()> {
524 self.runner.join();
525 for err in self.runner.take_results() {
526 err?;
527 }
528 let mut file = self.file.lock_blocking();
529 let index_offset = file.seek(std::io::SeekFrom::End(0))?;
530 let mut index_data = MemWriter::new();
531 let items = self.items.lock_blocking();
532 for (_, item) in items.iter() {
533 let mut file_chunk = MemWriter::new();
534 let name = encode_string(Encoding::Utf16LE, &item.name, false)?;
535 let info_data_size = name.len() as u64 + 22;
536 file_chunk.write_all(CHUNK_INFO)?;
537 file_chunk.write_u64(info_data_size)?;
538 file_chunk.write_u32(0)?; file_chunk.write_u64(item.original_size)?;
540 file_chunk.write_u64(item.archived_size)?;
541 file_chunk.write_u16(name.len() as u16 / 2)?;
542 file_chunk.write_all(&name)?;
543 let segm_data_size = item.segments.len() as u64 * 28;
544 file_chunk.write_all(CHUNK_SEGM)?;
545 file_chunk.write_u64(segm_data_size)?;
546 for seg in &item.segments {
547 let flag = if seg.is_compressed {
548 TVP_XP3_SEGM_ENCODE_ZLIB
549 } else {
550 TVP_XP3_SEGM_ENCODE_RAW
551 };
552 file_chunk.write_u32(flag)?;
553 file_chunk.write_u64(seg.start)?;
554 file_chunk.write_u64(seg.original_size)?;
555 file_chunk.write_u64(seg.archived_size)?;
556 }
557 let adlr_data_size = 4;
558 file_chunk.write_all(CHUNK_ADLR)?;
559 file_chunk.write_u64(adlr_data_size)?;
560 if self.no_adler {
561 file_chunk.write_u32(0)?;
562 } else {
563 file_chunk.write_u32(item.file_hash)?;
564 }
565 index_data.write_all(CHUNK_FILE)?;
566 let file_chunk = file_chunk.into_inner();
567 index_data.write_u64(file_chunk.len() as u64)?;
568 index_data.write_all(&file_chunk)?;
569 }
570 let index_data = index_data.into_inner();
571 if self.compress_index {
572 let compressed_index = if self.use_zopfli {
573 let option = zopfli::Options {
574 iteration_count: self.zopfli_iteration_count,
575 iterations_without_improvement: self.zopfli_iterations_without_improvement,
576 maximum_block_splits: self.zopfli_maximum_block_splits,
577 };
578 let mut e =
579 zopfli::ZlibEncoder::new(option, zopfli::BlockType::Dynamic, Vec::new())?;
580 e.write_all(&index_data)?;
581 e.finish()?
582 } else if self.use_zstd {
583 let mut e = zstd::stream::Encoder::new(Vec::new(), self.zstd_compression_level)?;
584 e.write_all(&index_data)?;
585 e.finish()?
586 } else {
587 let mut e = flate2::write::ZlibEncoder::new(
588 Vec::new(),
589 flate2::Compression::new(self.zlib_compression_level),
590 );
591 e.write_all(&index_data)?;
592 e.finish()?
593 };
594 file.write_u8(TVP_XP3_INDEX_ENCODE_ZLIB)?;
595 file.write_u64(compressed_index.len() as u64)?;
596 file.write_u64(index_data.len() as u64)?;
597 file.write_all(&compressed_index)?;
598 } else {
599 file.write_u8(TVP_XP3_INDEX_ENCODE_RAW)?;
600 file.write_u64(index_data.len() as u64)?;
601 file.write_all(&index_data)?;
602 }
603 file.write_u64_at(11, index_offset)?; file.flush()?;
605 eprintln!("XP3 Archive Statistics:\n{}", self.stats);
606 Ok(())
607 }
608}